{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 10 Minutes to Dask-XGBoost\n",
    "\n",
    "The [RAPIDS Fork of Dask-XGBoost](https://github.com/rapidsai/dask-xgboost/ \"RAPIDS Dask-XGBoost\") enables XGBoost with the distributed CUDA DataFrame via Dask-cuDF. A user may pass Dask-XGBoost a reference to a distributed cuDF object, and start a training session over an entire cluster from Python. [The RAPIDS Fork of XGBoost](https://github.com/rapidsai/xgboost \"RAPIDS XGBoost\") enables XGBoost with the CUDA DataFrame, and we are actively working to unify all of this functionality into a single API consumable from [DMLC XGBoost](https://github.com/dmlc/xgboost \"DMLC XGBoost\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Disable NCCL P2P. Only necessary for versions of NCCL < 2.4"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%env NCCL_P2P_DISABLE=1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Import necessary modules and initialize the Dask-cuDF Cluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Using `LocalCUDACluster` from Dask-CUDA to instantiate the single-node cluster.\n",
    "\n",
    "A user may instantiate a Dask-cuDF cluster like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import cudf\n",
    "import dask\n",
    "import dask_cudf\n",
    "import dask_xgboost\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "\n",
    "from dask.distributed import Client, wait\n",
    "from dask_cuda import LocalCUDACluster\n",
    "\n",
    "import subprocess\n",
    "\n",
    "cmd = \"hostname --all-ip-addresses\"\n",
    "process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)\n",
    "output, error = process.communicate()\n",
    "IPADDR = str(output.decode()).split()[0]\n",
    "\n",
    "cluster = LocalCUDACluster(ip=IPADDR)\n",
    "client = Client(cluster)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note the use of `from dask_cuda import LocalCUDACluster`. [Dask-CUDA](https://github.com/rapidsai/dask-cuda) is a lightweight set of utilities useful for setting up a Dask cluster. These calls instantiate a Dask-cuDF cluster in a single node environment. To instantiate a multi-node Dask-cuDF cluster, a user must use `dask-scheduler` and `dask-cuda-worker`. These are utilities available at the command-line to launch the scheduler, and its associated workers."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initialize a Random Dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `dask_cudf.DataFrame.query` to split the dataset into train-and-test sets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "size = 1000000\n",
    "npartitions = 8\n",
    "\n",
    "pdf = pd.DataFrame({'x': np.random.randint(0, npartitions, size=size), 'y': np.random.normal(size=size)})\n",
    "pdf = dask.dataframe.from_pandas(pdf, npartitions=npartitions)\n",
    "\n",
    "ddf = dask_cudf.from_dask_dataframe(pdf)\n",
    "\n",
    "x_train = ddf.query('y < 0.5')\n",
    "y_train = x_train[['x']]\n",
    "x_train = x_train[x_train.columns.difference(['x'])]\n",
    "\n",
    "x_test = ddf.query('y > 0.5')\n",
    "y_test = x_test[['x']]\n",
    "x_test = x_test[x_test.columns.difference(['x'])]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Define Parameters and Train with XGBoost"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `dask_cudf.DataFrame.persist()` to ensure each GPU worker has ownership of data before training for optimal load-balance. Please note: this is optional."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "params = {\n",
    "  'num_rounds':   100,\n",
    "  'max_depth':    8,\n",
    "  'max_leaves':   2**8,\n",
    "  'n_gpus':       1,\n",
    "  'tree_method':  'gpu_hist',\n",
    "  'objective':    'reg:squarederror',\n",
    "  'grow_policy':  'lossguide'\n",
    "}\n",
    "\n",
    "## Optional: persist training data into memory\n",
    "# x_train = x_train.persist()\n",
    "# y_train = y_train.persist()\n",
    "\n",
    "bst = dask_xgboost.train(client, params, x_train, y_train, num_boost_round=params['num_rounds'])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Inputs for `dask_xgboost.train`\n",
    "\n",
    "1. `client`: the `dask.distributed.Client`\n",
    "2. `params`: the training parameters for XGBoost. Note that it is a requirement to set `'n_gpus': 1`, as it tells Dask-cuDF that each worker will have a single GPU to perform coordinated computation\n",
    "3. `x_train`: an instance of `dask_cudf.DataFrame` containing the data to be trained\n",
    "4. `y_train`: an instance of `dask_cudf.Series` containing the labels for the training data\n",
    "5. `num_boost_round=params['num_rounds']`: a specification on the number of boosting rounds for the training session"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compute Predictions and the RMSE Metric for the Model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use `dask.dataframe.multi.concat` to build a `dask_cudf.DataFrame` from `[dask_cudf.Series]` to leverage a cleaner API for computing RMSE"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pred = dask_xgboost.predict(client, bst, x_test)\n",
    "test = dask.dataframe.multi.concat([pred], axis=1)\n",
    "\n",
    "test['squared_error'] = (test[0] - y_test['x'])**2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### How to run prediction via `dask_xgboost.predict`\n",
    "\n",
    "1. `client`: the `dask.distributed.Client`\n",
    "2. `bst`: the Booster produced by the XGBoost training session\n",
    "3. `x_test`: an instance of `dask_cudf.DataFrame` containing the data to be inferenced (acquire predictions)\n",
    "\n",
    "`pred` will be an instance of `dask_cudf.Series`\n",
    "\n",
    "We can use `dask.dataframe.multi.concat` to construct a `dask_cudf.DataFrame` by concatenating the list of `dask_cudf.Series` instances (`[pred]`)\n",
    "\n",
    "`test` is a `dask_cudf.DataFrame` object with a single column named `0` (e.g.) `test[0]` returns `pred`. Additionally, the root-mean-squared-error (RMSE) can be computed by constructing a new column and assigning to it the value of the difference between predicted and labeled values squared. This is encoded in the assignment `test['squared_error'] = (test[0] - y_test['x'])**2`.\n",
    "\n",
    "Finally, the mean can be computed by using an aggregator from the `dask_cudf` API. The entire computation is initiated via `.compute()`. We take the square-root of the result, leaving us with `rmse = np.sqrt(test.squared_error.mean().compute())`. Note: `.squared_error` is an accessor for `test[squared_error]`... Like so:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rmse = np.sqrt(test.squared_error.mean().compute())\n",
    "print('rmse value:', rmse)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
